-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 source: implementation for mysql cdc #3505
Conversation
/test connector=source-mysql
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@subodh1810 this looks great! a lot of tough stuff to work through here. I think your approach makes a lot of sense.
@@ -383,7 +385,7 @@ public void testEmptyStateIncrementalIdenticalToFullRefresh() throws Exception { | |||
.collect(Collectors.toList()); | |||
} | |||
|
|||
private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) { | |||
public ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you making this public and overriding it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! I guess I was playing around with something and forgot to revert the change. Will fix this. Its not required
"type": "string", | ||
"order": 5 | ||
}, | ||
"replication_method": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in slack you asked about how we make this backwards compatible! I think you can just handle this in the connector code, so that if it find replication_method not set, it defaults to STANDARD. I think this isn't an ideal solution, but it's the best we have for right now until we have a better upgrade path for connectors.
@sherifnada do you agree? or is there some other option you'd prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to push this configuration out of spec.json entirely and just rely on the sync mode being "cdc" (so introducing a 3rd mode next to full refresh and incremental)? If not, then this approach makes sense. We'll also want to remove this from the list of required parameters in spec.json I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cgardens @sherifnada if I understand correctly then this is what you meant right?
private static boolean isCdc(JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
&& ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
return isCdc;
}
We already have this here
https://github.com/airbytehq/airbyte/pull/3505/files#diff-5e69a6c3136273688c785c97c6a40bef79049c430f8aba721b79705fe4079736R150
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we also want to add a clause where if replication_method
is not set then it assumes it is standard right?
|
||
/** | ||
* This implementation is is kind of similar to | ||
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} ()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} ()} | |
* {@link io.debezium.relational.history.FileDatabaseHistory#recoverRecords(Consumer)} |
LOGGER.info("using CDC: {}", true); | ||
// TODO: Figure out how to set the isCDC of stateManager to true. Its always false | ||
final AirbyteFileOffsetBackingStore offsetManager = initializeState(stateManager); | ||
AirbyteFileDatabaseHistoryStorageOperations dbHistoryStorageManager = initializeDBHistory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this variable name maybe just historyManager
or schemaHistoryManager
?
return offsetManager; | ||
} | ||
|
||
private AirbyteFileDatabaseHistoryStorageOperations initializeDBHistory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense for this method to just be a static method on AirbyteFileDatabaseHistoryStorageOperations
?
} | ||
} | ||
} catch (IOException e) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IllegaltStateException?
public DebeziumRecordPublisher(JsonNode config, | ||
ConfiguredAirbyteCatalog catalog, | ||
AirbyteFileOffsetBackingStore offsetManager, | ||
AirbyteFileDatabaseHistoryStorageOperations airbyteFileDatabaseHistoryStorageOperations) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you change this variable name in MySqlSource maybe change it here too?
* function smoothly. Check {@link #persist(CdcState)} To understand more about file, please refer | ||
* {@link FilteredFileDatabaseHistory} | ||
*/ | ||
public class AirbyteFileDatabaseHistoryStorageOperations { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just AirbyteSchemaHistoryStorage
? or something a little shorter? nbd if you prefer the verbose name.
/test connector=source-mysql
|
# Conflicts: # airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcPostgresSourceAcceptanceTest.java
/test connector=source-mysql
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
- I appreciated all the tests you added.
- I appreciated all the helpful javadocs and comments.
I only had very minor readability comments, all of which are nits except for the shouldSignalClose
function. There is also one question for my understanding. My suggestion changes should be pretty straightforward, so feel free to merge after addressing.
@@ -16,13 +16,16 @@ dependencies { | |||
|
|||
implementation 'mysql:mysql-connector-java:8.0.22' | |||
implementation 'org.apache.commons:commons-lang3:3.11' | |||
implementation 'io.debezium:debezium-embedded:1.4.2.Final' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: alphabetise
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* The record iterator is the consumer (in the producer / consumer relationship with debezium) is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* The record iterator is the consumer (in the producer / consumer relationship with debezium) is | |
* The record iterator is the consumer (in the producer / consumer relationship with debezium) |
* this signal either when the publisher had not produced a new record for a long time or when it | ||
* has processed at least all of the records that were present in the database when the source was | ||
* started. Because the publisher might publish more records between the consumer sending this | ||
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the | |
* signal and the publisher actually shutting down, the consumer must stay alive as long as the |
* has processed at least all of the records that were present in the database when the source was | ||
* started. Because the publisher might publish more records between the consumer sending this | ||
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the | ||
* publisher is not closed or if there are any new records for it to process (even if the publisher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* publisher is not closed or if there are any new records for it to process (even if the publisher | |
* publisher is not closed. Even after the publisher is closed, the consumer will finish processing any produced records before closing. |
* started. Because the publisher might publish more records between the consumer sending this | ||
* signal and the publisher acutally shutting down, the consumer must stay alive as long as the | ||
* publisher is not closed or if there are any new records for it to process (even if the publisher | ||
* is closed). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* is closed). |
requestClose.call(); | ||
} | ||
|
||
private boolean shouldSignalClose(ChangeEvent<String, String> event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth rewriting these conditionals to simplify the boolean logic for readability. something like
private boolean shouldSignalClos(ChangeEvent<String, String> event) {
if (targetFilePosition.isEmpty()) {
return false;
}
String file = Jsons.deserialize(event.value()).get("source").get("file").asText();
int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt();
if (!file.equals(targetFilePosition.get().fileName)) {
return false;
}
if (targetFilePosition.get().position >= position) {
return false;
}
// if not snapshot or is snapshot but last record in snapshot.
return SnapshotMetadata.TRUE != SnapshotMetadata.valueOf(
Jsons.deserialize(event.value()).get("source").get("snapshot").asText()
.toUpperCase());
}
I generally try to avoid running right bracket since it can get confusing & return false/errors earlier to make clear what is happening.
if (targetFilePosition.isPresent()) { | ||
String file = Jsons.deserialize(event.value()).get("source").get("file").asText(); | ||
int position = Jsons.deserialize(event.value()).get("source").get("pos").asInt(); | ||
if (file.equals(targetFilePosition.get().fileName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writing the above example made me think about this line.
are we comparing file names here? (slightly confused since we are compared file
to filename
) what happens if the names don't match up? should we error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There can be multiple files, we target the file with the latest records as end point. If the file name doesn't match then we are behind the latest file
|
||
/** | ||
* MySQL Debezium connector monitors the database schema evolution over the time and stores the data | ||
* in database history file. Without this file we can't fetch the records from binlog. We need to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* in database history file. Without this file we can't fetch the records from binlog. We need to | |
* in a database history file. Without this file we can't fetch the records from binlog. We need to |
return isCdc; | ||
} | ||
|
||
static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { | |
private static boolean shouldUseCDC(ConfiguredAirbyteCatalog catalog) { |
@@ -66,6 +208,93 @@ public JsonNode toJdbcConfig(JsonNode config) { | |||
return Jsons.jsonNode(configBuilder.build()); | |||
} | |||
|
|||
private static boolean isCdc(JsonNode config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer separating stuff out. I think in this case returning the inline variable is better.
So
return config.hasNonNull("replication_method")
&& ReplicationMethod.valueOf(config.get("replication_method").asText())
.equals(ReplicationMethod.CDC);
This is a nit.
/test connector=source-mysql
|
/publish connector=connectors/source-mysql
|
What
Issue : #2847
MySQL CDC is built on top of debezium MySQL connector. The CDC works on the basis of binlog files and position.
Few things to highlight about the PR :
We slightly modified the writing logic, since a single Airbyte connector can only fetch data for 1 database, we only need to store the schema evolution of tables in that database. In order to create this filtering we created our own class
FilteredFileDatabaseHistory
. TheFilteredFileDatabaseHistory
is used to filter out the schema information only related to the database that the connector is syncing. Since we need to store this information,FilteredFileDatabaseHistory
enables us to reduce the size of the data that we save.AirbyteSchemaHistoryStorage
is responsible to persist and reload the data i.e. read the contents of the file and save it in state at the end of the sync and create the file and copy the contents saved in state at the beginning of the sync.You can find more information about this here : https://debezium.io/documentation/reference/1.4/operations/debezium-server.html#debezium-source-database-history-class
https://debezium.io/documentation/reference/development/engine.html#_in_the_code
snapshot.locking.mode
asnone
. This prevents the connector from acquiring any table locks during the snapshot but it is safe to use if and only if no schema changes are happening while the snapshot is running.Ref : https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-snapshots
https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-snapshot-locking-mode
Pre-merge Checklist
Recommended reading order
MySqlSource.java#getIncrementalIterators
DebeziumRecordPublisher
┆Issue is synchronized with this Asana task by Unito